// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(clippy::uninlined_format_args)]

//! Implement a semaphore upon meta-service watch API and upsert API
//!
//! Example:
//!
//! ```rust,ignore
//! let client = MetaGrpcClient::try_create(/*..*/);
//! let acquired_guard = Semaphore::new_acquired(
//!         client,
//!         "your/semaphore/name/in/meta/service",
//!         2,                          // capacity: 2 acquired at most
//!         "id11",                     // ID of this acquirer
//!         Duration::from_secs(3)      // lease time
//! ).await?;
//!
//! acquired_guard.await;
//! // Released
//! ```
//!
//! Semaphore key structure:
//! ```text
//! <prefix>/meta          -> {capacity: 10} // this is not implemented in this version; 2025-03-19
//! <prefix>/queue/<seq_1>  -> {id: "<id_1>", value: 1}
//! <prefix>/queue/<seq_2>  -> {id: "<id_2>", value: 2}
//! <prefix>/queue/<seq_3>  -> {id: "<id_3>", value: 1}
//! <prefix>/seq_generator -> {}
//! ```
//!
//! - `<prefix>` is a user-defined string to identify a semaphore instance.
//! - `meta` key stores the capacity of the semaphore, representing the total available resources.
//! - `queue/*` directory contains semaphore entries, each identified by a sequence number, which
//!   is generated by `seq_generator`. Each entry has:
//!   - `id`: A user defined id.
//!   - `value`: The amount of resource this entry consumes.
//! - `seq_generator` is updated to generate a globally unique `seq` whenever a new semaphore entry
//!   is created.
//!
//!
//! ## Acquisition Process
//!
//! To acquire a semaphore, a client:
//! 1. Updates the `seq_generator` key to obtain a new `seq`
//! 2. Creates a [`PermitEntry`] with `acquirer_id` and inserts it at `queue/<seq>`, where `<seq>`
//!    is serialized to string in an order-preserving way.
//! 3. Submits a watch request on the `prefix` with `initial_flush=true` to receive all data under
//!    `<prefix>`.
//!    When receiving a key-value change event, it populates two local collections: `acquired` and `waiting`.
//!    Entries in both of the local queues are sorted by their `seq` values
//!
//! If the sum of `value` fields in `acquired` is less than `capacity`, new entries from the watch
//! stream are added to `acquired` until the sum equals or exceeds `capacity`. Any additional
//! entries go to `waiting`.  When an entry is removed from `acquired`, entries from `waiting` with
//! the smallest `seq` values are moved to `acquired` as capacity permits.
//!
//!
//! ## Atomic Insertion and Sequence Ordering
//!
//! When inserting a new entry, the client must verify the `seq_generator` seq within a transaction
//! to ensure entries are inserted in the correct order. This atomic verification is critical for
//! consistency.
//!
//! Without this check, two clients could reach contradictory conclusions about which entries are
//! acquired.  For example, consider entries `a:(seq=2,value=1)` and `b:(seq=1,value=1)` where
//! entry `b` has a lower sequence number and should have been inserted before `a`.
//!
//! If the semaphore capacity is 1:
//! - A client that first sees entry `a` would consider `a` as acquired and `b` as waiting
//! - Another client receiving both entries during `initial_flush` might see `b` before `a` and
//!   consider `b` as acquired
//!
//!
//! ## Implementation
//!
//! In this version (2025-03-19), the `meta` key is not implemented, and the semaphore capacity is
//! set directly by each client.  This design has a limitation: if different clients use different
//! capacity values, clients using smaller capacity values may experience starvation as they might
//! never see enough resources available according to their local capacity limit.  Future versions
//! will address this by storing capacity in the `meta` key to ensure consistency across all
//! clients.
//!
//!
//! ## Illustration
//!
//! The following diagram illustrates the internal components of the semaphore and the process of
//! acquiring a semaphore.
//!
//! -  [`Semaphore`] instance spawns a background task KV-Change-Subscriber to watch the kv-change
//!    events from the meta-service `(1)`. This subscriber monitors all entries under the semaphore's
//!    prefix and maintains the state of acquired and waiting entries. The semaphore then creates an
//!    [`Acquirer`] instance to register a semaphore entry and manage the acquisition process.
//!
//! -  [`Acquirer`] performs the following steps:
//!    1. Generates a new sequence number by updating the `seq_generator` key in the meta-service `(2)`.
//!    2. Creates a semaphore entry with this sequence number as key and inserts it into the `queue` directory
//!       and verifies the `seq_generator` key hasn't changed during insertion to ensure consistency.
//!       These two steps are performed in a transaction. `(3)`.
//!    3. Spawns a lease extender task to periodically refresh the TTL of the semaphore entry.  `(4)`.
//!    4. Waits for an `Acquired` event from the KV-Change-Subscriber, indicating the semaphore
//!       has been acquired. Returns a [`Permit`] to the client when acquisition is
//!       successful.`(5)`.
//!
//! -  [`Permit`] manages the lifecycle of the acquired semaphore:
//!    -  It contains two cancel channels: one for the lease extender task and one for the KV-Change-Subscriber
//!    -  It automatically cancel the two tasks when dropped.
//!    -  It implements `Future` that gets Ready with `Ok(())` if the semaphore entry is removed from
//!       the meta-service (either intentionally or due to TTL expiration).
//!       It gets Ready with an error [`ConnectionClosed`] if the stream from KV-Change-Subscriber is closed.
//!
//!    **Applications should poll the [`Permit`] as a Future to detect when the semaphore is released**
//! ```text
//! | Semaphore -> spawn()-. (1)
//! |     |                |
//! |     |                |
//! |     |                v                                watch Stream
//! |     |           .->o KV-Change-Subscriber (task) <--------------------------------------.
//! |     |           |  .-[acquired]                                                         |
//! |     |           |  | [waiting]                                                          |
//! |     |           |  |                                                                    |
//! |     |           |  |                                                                    |
//! |     v           |  |   (2) get next global unique seq                                   |
//! | Acquirer <--------------------------------------.                                       |
//! |   | | |         |  |                            |                                       |
//! |   | | |         |  |   (3) register             |   Meta-Service                        |
//! |   | | '----------------------------------+-->   '-> <prefix>/seq_generator   -> ''      |
//! |   | |           |  |                     |          <prefix>/queue/<seq_1> -> id_1 --+  |
//! |   | '-> spawn()------.                   '-->   .-> <prefix>/queue/<seq_2> -> id_2 --+--'
//! |   |     (4)     |  | |                          |   <prefix>/queue/<seq_3> -> id_3 --'
//! |   v             |  | |                          |
//! |   +<------------|--' |                          |
//! |   | (5)         |    v                          |
//! |   |             |    Leaser (task) -------------'
//! |   |             |    o             extend lease
//! |   |             |    ^
//! |   v             |    |
//! | Guard +-o-------'    |
//! |       |   cancel     |
//! |       `-o------------'
//! |           cancel
//! ```

#[cfg(doc)]
use acquirer::Permit;

pub mod acquirer;
pub mod errors;
mod meta_event_subscriber;
mod queue;
mod semaphore;
mod storage;

/// The current version of the semaphore encoding protocol.
const CURRENT_VERSION: u8 = 1u8;

pub type PermitSeq = u64;

pub use semaphore::Semaphore;
pub use storage::PermitEntry;
pub use storage::PermitKey;

// TODO test: fake several events sequence, assume different initial_flush set, and check the final state to be consistent.
